feat: add hex scalar function#449
Conversation
| fn hex_bytes_to_string(bytes: &[u8]) -> Result<String, std::fmt::Error> { | ||
| let mut hex_string = String::with_capacity(bytes.len() * 2); | ||
| for byte in bytes { | ||
| write!(&mut hex_string, "{:01X}", byte)?; |
There was a problem hiding this comment.
hmm, I think it should be :02X instead?
There was a problem hiding this comment.
By the way, there's hex_encode in scalar_func.rs, maybe we should moved that into this hex.rs instead, so related methods are grouped together.
There was a problem hiding this comment.
I'll look into :02X, but it seems to pass tests w/ :01X. Also on hex_encode, I wonder if the hash functions should go in scalar_funcs/hash.rs or something, then hex_encode could go there? Personal preference, but I think it'd be nice if scalar_funcs was refactored so it just had create_comet_physical_fun and the individual function impls went in scalar_funcs/.
There was a problem hiding this comment.
I think I can move sha2/md5 related to scalar_funcs in a follow-up pr. It’s indeed a better place. For the hex_encode, maybe we need to extract it to hex_utils or simply in hex.rs, and make that crate public. I don’t think it’s a blocker to this PR. Just saying it so that it can be addressed later.
| // ints | ||
| checkSparkAnswerAndOperator("SELECT hex(_2), hex(_3), hex(_4), hex(_5) FROM tbl") | ||
|
|
||
| // uints, uint8 and uint16 not working yet |
There was a problem hiding this comment.
I remember unsigned ints currently do not work in other places https://github.com/apache/datafusion-comet/blob/main/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala#L97
I think it is ok to leave it as TODO
|
@kazuyukitanimura @advancedxy ... re-requesting reviews from you two, please. I've updated the code to support dictionaries and removed some of the finer int types. Barring something happening on intel mac, the tests look to pass here: https://github.com/tshauck/arrow-datafusion-comet/actions/runs/9183637020.... thanks |
| if num >= 0 { | ||
| format!("{:X}", num) | ||
| } else { | ||
| format!("{:016X}", num as u64) |
There was a problem hiding this comment.
is this needed?
If the num is a negative i64, there's a leading 1 in its binary representation. So, there's no need to add width into the format?
Please correct me if I'm wrong here, we can simply write format!("{:X}", num) here?
| } | ||
|
|
||
| fn hex_bytes_to_string(bytes: &[u8]) -> Result<String, std::fmt::Error> { | ||
| let mut hex_string = String::with_capacity(bytes.len() * 2); |
There was a problem hiding this comment.
Hmm. I don't think we should double the size for string here. The bytes are already hexed bytes, aren't they?
And by the way, I think hex_bytes and hex_bytes_to_string should be combined into a single function, something like:
fn hex_bytes(bytes: &[u8]) -> String {
let length = bytes.len();
let mut hex_string = String::with_capacity(bytes.len() * 2);
let mut i = 0;
while i < length {
write!(&mut hex_string, "{:X}", (bytes[i] & 0xF0) >> 4).unwrap();
write!(&mut hex_string, "{:X}", bytes[i] & 0x0F).unwrap();
// or simply write
// write!(&mut hex_string, "{:02X}", bytes[i]).unwrap();
i += 1;
}
hex_string
}| let hexed: Vec<Option<String>> = array | ||
| .iter() | ||
| .map(|v| v.map(|v| hex_bytes_to_string(&hex_string(v))).transpose()) | ||
| .collect::<Result<_, _>>()?; | ||
|
|
||
| let string_array = StringArray::from(hexed); |
There was a problem hiding this comment.
I think it can be simplified to something like below, other places apply to this as well. Assuming hex_string already returns a string.
| let hexed: Vec<Option<String>> = array | |
| .iter() | |
| .map(|v| v.map(|v| hex_bytes_to_string(&hex_string(v))).transpose()) | |
| .collect::<Result<_, _>>()?; | |
| let string_array = StringArray::from(hexed); | |
| let hexed_array: StringArray = array | |
| .iter() | |
| .map(|v| v.map(|v| hex_string(v)) | |
| .collect(); | |
you can refer src/execution/datafusion/expressions/scalar_funcs.rs:685 for similar code.
|
|
||
| Ok(ColumnarValue::Array(Arc::new(string_array))) | ||
| } | ||
| DataType::Dictionary(_, value_type) if matches!(**value_type, DataType::Int64) => { |
There was a problem hiding this comment.
Not totally related to this PR. But this seems a bit unexpected. I believe many other codes in scalar_funcs and datafusion doesn't handle the dictionary type specially, such as spark_ceil, spark_rpad and spark_murmur3_hash or other functions registered in DataFusion. If we are going to handle dictionary types, we should also update these functions too? Or we should do it in a more systematic way, such as flatten the dictionary types first.
cc @sunchao @andygrove and @viirya for more inputs.
There was a problem hiding this comment.
I also ran into issues with this in handling cast operations and ended up flattening the dictionary type first, but just for cast expressions. I agree that we need to look at this and see if there is a more systematic approach we can use.
There was a problem hiding this comment.
The trick we used to use is adding Cast(child) in QueryPlanSerde that unpacks dictionaries.
We should consolidate the unpacking logic, otherwise we will need to add it every function. Or until that happens we can workaround with Cast
There was a problem hiding this comment.
The trick we used to use is adding Cast(child) in QueryPlanSerde that unpacks dictionaries.
Do you have any concrete examples of how this works by any chance? I remember I saw some unnecessary cast operation in the query plan serde file, didn't realize it was for unpacking dictionaries.
We should consolidate the unpacking logic, otherwise we will need to add it every function. Or until that happens we can workaround with Cast
Yes, maybe this logic should added in the rust planner side, which can unpack the dictionary automatically if it knows the expression cannot handle dictionary types.
There was a problem hiding this comment.
Do you have any concrete examples of how this works by any chance?
Hmm, it's been a while, I cannot find right away...
BTW my comment above is not a blocker. Since this PR already implemented it, we can follow up separately
There was a problem hiding this comment.
Not only in Comet. I do remember in DataFusion not all functions/expressions support dictionary types. I suspect if there is a systematic approach to deal with it, because I think there is no general approach to process dictionary-encoded inputs for different functions/expressions. For example, some functions can directly work on dictionary values and re-create a new dictionary with updated values, but for some functions, it is impossible so it needs to unpack dictionary first.
There was a problem hiding this comment.
BTW now I remember unpack_dictionary_type() unpacks early for primitive types, so like the one for Int64 can be omitted.
|
I think I made the updates requested in the latest round. I left the dictionary handling the same, but I can look into flattening the dictionary specific to hex if you guys think it's a good idea. Thanks |
| withParquetTable(path.toString, "tbl") { | ||
| // _9 and _10 (uint8 and uint16) not supported | ||
| checkSparkAnswerAndOperator( | ||
| "SELECT hex(_1), hex(_2), hex(_3), hex(_4), hex(_5), hex(_6), hex(_7), hex(_8), hex(_11), hex(_12), hex(_13), hex(_14), hex(_15), hex(_16), hex(_17), hex(_18), hex(_19), hex(_20) FROM tbl") |
There was a problem hiding this comment.
Let's also add scalar tests too
There was a problem hiding this comment.
Thanks for bringing this up. I guess I thought scalars weren't supported yet (maybe for UDFs only)? From what I could tell none of the other tests test scalars, i.e. most of them insert into a table, then query the table. E.g.,
test("Chr") {
Seq(false, true).foreach { dictionary =>
withSQLConf(
"parquet.enable.dictionary" -> dictionary.toString,
CometConf.COMET_CAST_ALLOW_INCOMPATIBLE.key -> "true") {
val table = "test"
withTable(table) {
sql(s"create table $table(col varchar(20)) using parquet")
sql(
s"insert into $table values('65'), ('66'), ('67'), ('68'), ('65'), ('66'), ('67'), ('68')")
checkSparkAnswerAndOperator(s"SELECT chr(col) FROM $table")
}
}
}
}Also running scalar tests seems to fail for the other UDFs I spot checked, e.g.,
- trim *** FAILED *** (7 seconds, 507 milliseconds)
Expected only Comet native operators, but found Project.
plan: *(1) Project [HI AS upper(trim(hi))#125]
+- *(1) Scan OneRowRelation[] (CometTestBase.scala:186)
Should they be working w/ UDFs, and it's just nothing else tests for them?
There was a problem hiding this comment.
We have several scalar tests i.e. ceil and floor, scalar decimal arithmetic operations, etc. I agree that the existing coverage is not great.
Since you have the code to handle scalar, it is best to test them. Or it is also okay to disable scalar for now and put it as TODO.
There was a problem hiding this comment.
Thanks, I agree it's best to test since they're there. I just genuinely thought it wasn't supported given the test coverage, the errors on the other functions, etc. I'll have a look at adding some tests...
There was a problem hiding this comment.
I'm realizing that I'm not going to have time next week and didn't expect this PR to take this long, so I've removed the scalar handling for now and hopefully can follow an example in the future. 88bdcde
There was a problem hiding this comment.
I think we can keep the scalar functions, and it should be pretty straightforward to test scalar input?
Namely, it should be something like:
select hex(10), hex('abc')
The constant literal should be encoded as a ScalarValue.
There was a problem hiding this comment.
I tried that already and got test failures because the native function isn't being recognized w/ scalars. The same output I mentioned w/ trim above.
There was a problem hiding this comment.
Sounds like QueryPlanSerde is failing the case match.
Wondering if you already have tried "spark.sql.optimizer.excludedRules" -> "org.apache.spark.sql.catalyst.optimizer.ConstantFolding" ?
Also right now, since the scalar code was removed from rust, it will fail if the native code happen to find scalar values...
There was a problem hiding this comment.
Thanks, yes, I did also try disabling ConstantFolding to no avail.
There was a problem hiding this comment.
hex(literal) should be evaluated to literal early in Spark optimizer. So it won't hit the native hex. I wonder what test failure you saw?
advancedxy
left a comment
There was a problem hiding this comment.
Left two comments, which are both not blockers.
|
|
||
| Ok(ColumnarValue::Array(Arc::new(string_array))) | ||
| } | ||
| DataType::Dictionary(_, value_type) if matches!(**value_type, DataType::Int64) => { |
There was a problem hiding this comment.
The trick we used to use is adding Cast(child) in QueryPlanSerde that unpacks dictionaries.
Do you have any concrete examples of how this works by any chance? I remember I saw some unnecessary cast operation in the query plan serde file, didn't realize it was for unpacking dictionaries.
We should consolidate the unpacking logic, otherwise we will need to add it every function. Or until that happens we can workaround with Cast
Yes, maybe this logic should added in the rust planner side, which can unpack the dictionary automatically if it knows the expression cannot handle dictionary types.
| withParquetTable(path.toString, "tbl") { | ||
| // _9 and _10 (uint8 and uint16) not supported | ||
| checkSparkAnswerAndOperator( | ||
| "SELECT hex(_1), hex(_2), hex(_3), hex(_4), hex(_5), hex(_6), hex(_7), hex(_8), hex(_11), hex(_12), hex(_13), hex(_14), hex(_15), hex(_16), hex(_17), hex(_18), hex(_19), hex(_20) FROM tbl") |
There was a problem hiding this comment.
I think we can keep the scalar functions, and it should be pretty straightforward to test scalar input?
Namely, it should be something like:
select hex(10), hex('abc')
The constant literal should be encoded as a ScalarValue.
| let keys = dict.keys().clone(); | ||
| let mut new_keys = Vec::with_capacity(values.len()); | ||
|
|
||
| for key in keys.iter() { | ||
| let key = key.map(|k| values[k as usize].clone()).unwrap_or(None); | ||
| new_keys.push(key); | ||
| } | ||
|
|
||
| let string_array_values = StringArray::from(new_keys); |
There was a problem hiding this comment.
Hmm, why we don't re-create a new dictionary array of string? Hex is a kind of function that doesn't change dictionary-encoded mapping. You can simply take new values and existing keys to create a new dictionary array.
There was a problem hiding this comment.
We have the same issue currently in cast from string to other types.
@viirya Do we have an example somewhere of converting a dictionary array without unpacking?
There was a problem hiding this comment.
For this case, we can call
dictionary.with_values(Arc::new(values));
to construct a new dictionary array with new values.
There was a problem hiding this comment.
@viirya, thanks, where I'm a bit unclear is how to have the function return type also be a dictionary. The data type for the hex expression seems to be a Utf8, so I get org.apache.comet.CometNativeException: Arrow error: Invalid argument error: column types must match schema types, expected Utf8 but found Dictionary(Int32, Utf8) after making the update to use with_values.
There was a problem hiding this comment.
@viirya Am I correct in understanding that this PR is functionally correct but just not as efficient as possible? Perhaps we could consider having a follow-up issue to optimize this to rewrite the dictionary? It seems that we don't have a full example of dictionary rewrite for contributors to follow.
There was a problem hiding this comment.
@andygrove Yes. It's not efficient but should be correct.
| } | ||
| test("unhex") { |
There was a problem hiding this comment.
style:
| } | |
| test("unhex") { | |
| } | |
| test("unhex") { |
viirya
left a comment
There was a problem hiding this comment.
I left a few comments including ones that need to be addressed before merging this, e.g. #449 (comment).
| let string_array_values = StringArray::from(new_keys); | ||
| Ok(ColumnarValue::Array(Arc::new(string_array_values))) | ||
| } | ||
| DataType::Dictionary(_, value_type) if matches!(**value_type, DataType::Binary) => { |
There was a problem hiding this comment.
This part of dictionary arrays handling can be rewritten to reduce duplicate actually. It can be follow-up though.
* feat: add hex scalar function * test: change hex test to use makeParquetFileAllTypes, support more types * test: add more columns to spark test * refactor: remove extra rust code * feat: support dictionary * fix: simplify hex_int64 * refactor: combine functions for hex byte/string * refactor: update vec collection * refactor: refactor hex to support byte ref * style: fix clippy * refactor: remove scalar handling * style: new lines in expression test file * fix: handle large strings
Which issue does this PR close?
Related to #341.
Rationale for this change
I recently added
unhexso this PR addshex. It's another scalar function that isn't yet implemented in Comet.I decided to do a native version here because datafusion has an
to_hex, but it has different functionality. E.g.to_hex(-1)returnsffffffffffffffffin datafusion vshex(-1)returningFFFFFFFFFFFFFFFFin spark. Sparkhexalso supports strings, but datafusion and postgres do not. I'm happy to start the discussion if it seems it could be merged upstream.What changes are included in this PR?
I added the
hexscalar function.How are these changes tested?
Yes, I've added tests to the rust side as well as spark sql based tests in scala.